Cloud One File Storage Securityの検出をSlackに通知する
こんにちは。たかやまです。
今回はCloud One File Storage Security(c1fss)のウイルス検出結果をSlackに通知する方法をご紹介したいと思います。
メール向けの通知についてはこちらのブログをご確認ください。
やってみた
Webhookの作成
こちらを参考に事前にWebhook URLを用意してください。
SAMデプロイ
c1fssのslack通知のプラグインはServerless Application Repositoryで公開されています。
今回はこちらのプラグインをご紹介します。
リンク先からDeploy
を選択します。
以下の項目を入力していきます。
SlackChannel
とSlackUsername
はカスタムインテグレーション*1のIncoming Webhookでのみ有効な値になります。Slack AppでIncomming Webhookを作成した場合は入力不要になります。
1 現在非推奨
- ScanResultTopicARN : c1fssストレージスタックのSNS Topic ARN
- SlackChannel : slack送信先チャンネル(カスタムインテグレーションでのみ有効)
- SlackURL : 作成したWebhook URL
- SlackUsername : slack通知で表示するユーザ名(カスタムインテグレーションでのみ有効)
アプリケーションが作成されていればOKです!
テスト検知
アプリケーションが作成されたらこちらのテスト検知方法を実施します。
ローカルPCにウイルス対策ソフトが入っていると、検知に使うeicarファイルが削除される可能性があります。なのでここではCloudshellを使ってeicarファイルをS3にアップロードしていきます。
curl -O https://secure.eicar.org/eicar.com aws s3 cp eicar.com s3://<YOUR_BUCKET>
ウイルス検知がされるとこのような通知が来ます。
(画像と名前はSlack側で設定しています)
供養
実はこのSAMに気づかず、自分で通知用のLambdaを書いていました...
せっかくなので作成したスクリプトを供養がてらこちらに載せておきます。
import json import logging import os from datetime import datetime, timedelta, timezone import urllib3 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) JST = timezone(timedelta(hours=+9), "JST") http = urllib3.PoolManager() url = os.environ.get("WEBHOOK_URL") def lambda_handler(event, context): sns_json = json.loads(event["Records"][0]["Sns"]["Message"]) logger.info(sns_json) payload = check_message(sns_json) if payload: post_slack(url, json.dumps(payload)) else: pass def check_message(sns_json: dict): file_url = sns_json["file_url"] timestamp = datetime.fromtimestamp(sns_json["timestamp"], JST).strftime( "%Y-%m-%d %H:%M:%S" ) if sns_json["scanner_status"] == 0 and sns_json["scanning_result"]["Findings"]: logger.info("c1fss virus detected") findings = json.dumps(sns_json["scanning_result"]["Findings"][0], indent=2) return { "attachments": [ { "color": "#e60033", "blocks": [ { "type": "header", "text": { "type": "plain_text", "text": ":rotating_light:Virus detected", "emoji": True, }, }, { "type": "section", "text": { "type": "mrkdwn", "text": f"File URL: <{file_url}|{file_url}>", }, }, { "type": "section", "fields": [ { "type": "mrkdwn", "text": f"Finding:\n }, {"type": "mrkdwn", "text": f"Timestamp:\n {timestamp}"}, ], }, ], } ] } elif sns_json["scanner_status"] != 0: logger.info("c1fss error status") status_message = sns_json["scanner_status_message"] return { "attachments": [ { "color": "#e60033", "blocks": [ { "type": "header", "text": { "type": "plain_text", "text": ":rotating_light:Error Status", "emoji": True, }, }, { "type": "section", "text": { "type": "mrkdwn", "text": f"File URL: <{file_url}|{file_url}>", }, }, { "type": "section", "fields": [ { "type": "mrkdwn", "text": f"Error:\n {status_message}", }, {"type": "mrkdwn", "text": f"Timestamp:\n {timestamp}"}, ], }, ], } ] } else: logger.info("c1fss successful scan") def post_slack(url, payload): try: res = http.request("POST", url, body=payload) except res.exceptions.RequestExceptioexceptions.RequestExceptionn as e: logger.error(e) raise else: logger.info({"request_result": {"status": res.status, "data": res.data}})
通知内容はこんな感じ。
ウイルス検知時以外に、エラーコード検知時も通知するようにしています。
今回ご紹介したSAMはウイルス検知時のみ通知する内容のため、c1fssエラー時にも通知したい場合は上記のようなコードをSAM内のLambdaに追記いただければと思います。
c1fssの通知形式はこちらになります。
最後に
Trend Micro製品は結構AWSでテンプレートを提供していることがあるので先に調べてみるのがよいかもしれません。使えるものは使っていきましょう!私のような悲しみをうまないために。。
以上、たかやま(@nyan_kotaroo)でした。